package com.stars.easyms.base.util;

import java.util.ArrayList;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.AbstractExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

/**
 * <p>className: EasyMsThreadPoolExecutor</p>
 * <p>description: EasyMs线程池：去掉队列无需等待，默认只开启核心线程，当核心线程全部工作时开启非核心线程执行，当非核心线程空闲时间超过最大空闲时间时关闭线程</p>
 *
 * @author guoguifang
 * @date 2019-09-19 16:43
 * @since 1.3.2
 */
public final class EasyMsThreadPoolExecutor extends AbstractExecutorService {

    private final ReentrantLock mainLock = new ReentrantLock();

    private final ReentrantLock poolSizeLock = new ReentrantLock();

    private final Set<Worker> workers = new LinkedHashSet<>();

    private int largestPoolSize;

    private volatile long keepAliveTime;

    private volatile int corePoolSize;

    private volatile int maxPoolSize;

    private volatile ThreadFactory threadFactory;

    private volatile boolean shutdown;

    private final AtomicLong completedTaskCount = new AtomicLong();

    /**
     * 线程池构造方法：当创建线程池时默认预先创建corePoolSize个线程并将线程置为等待模式
     *
     * @param corePoolSize  核心线程数量
     * @param maxPoolSize   最大线程数量
     * @param keepAliveTime 非核心线程最大空闲时间
     * @param timeUnit      非核心线程最大空闲时间的时间单位
     * @param threadFactory 线程工厂对象
     */
    public EasyMsThreadPoolExecutor(int corePoolSize, int maxPoolSize, long keepAliveTime, TimeUnit timeUnit, ThreadFactory threadFactory) {
        if (corePoolSize < 0 || maxPoolSize <= 0 || maxPoolSize < corePoolSize || keepAliveTime < 0) {
            throw new IllegalArgumentException();
        }
        this.corePoolSize = corePoolSize;
        this.maxPoolSize = maxPoolSize;
        this.keepAliveTime = timeUnit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.startAllCoreThreads();
    }

    /**
     * 线程池构造方法：当创建线程池时默认预先创建corePoolSize个线程并将线程置为等待模式
     *
     * @param corePoolSize  核心线程数量
     * @param maxPoolSize   最大线程数量
     * @param keepAliveTime 非核心线程最大空闲时间
     * @param timeUnit      非核心线程最大空闲时间的时间单位
     */
    public EasyMsThreadPoolExecutor(int corePoolSize, int maxPoolSize, long keepAliveTime, TimeUnit timeUnit) {
        this(corePoolSize, maxPoolSize, keepAliveTime, timeUnit, new DefaultEasyMsThreadFactory());
    }

    @Override
    public void shutdown() {
        this.mainLock.lock();
        try {
            workers.clear();
            shutdown = true;
        } finally {
            this.mainLock.unlock();
        }
    }

    @Override
    public List<Runnable> shutdownNow() {
        this.mainLock.lock();
        try {
            List<Runnable> runnableList = new ArrayList<>();
            for (Worker worker : workers) {
                if (worker.task != null) {
                    runnableList.add(worker.task);
                }
            }
            shutdown = true;
            return runnableList;
        } finally {
            this.mainLock.unlock();
        }
    }

    @Override
    public boolean isShutdown() {
        return shutdown;
    }

    @Override
    public boolean isTerminated() {
        throw new UnsupportedOperationException();
    }

    @Override
    public boolean awaitTermination(long timeout, TimeUnit unit) {
        throw new UnsupportedOperationException();
    }

    @Override
    public void execute(Runnable command) {
        if (command == null) {
            throw new NullPointerException();
        }
        Worker worker = getExecutableWorker();
        if (worker == null) {
            command.run();
            return;
        }
        worker.setTask(command);
    }

    /**
     * 当创建线程池时默认预先创建corePoolSize个线程并将线程置为等待模式
     */
    private void startAllCoreThreads() {
        Worker worker;
        do {
            worker = addWorker(true);
        } while (worker != null);
    }

    /**
     * 新增线程
     *
     * @param core 是否核心线程
     * @return 新增成功返回创建成功的对象，否则返回null
     */
    private Worker addWorker(boolean core) {
        this.mainLock.lock();
        try {
            // 先判断已创建线程数量是否已经超过核心线程/最大线程数量
            int workerCount = this.workers.size();
            int poolSize = core ? this.corePoolSize : this.maxPoolSize;
            if (workerCount >= poolSize) {
                return null;
            }
            Worker worker = new Worker(null, core);
            worker.thread.start();
            if (this.workers.add(worker)) {
                if (workerCount + 1 > this.largestPoolSize) {
                    this.largestPoolSize = workerCount + 1;
                }
                return worker;
            }
            return null;
        } finally {
            this.mainLock.unlock();
        }
    }

    /**
     * 获取当前可执行任务的线程
     */
    private Worker getExecutableWorker() {
        this.poolSizeLock.lock();
        this.mainLock.lock();
        try {
            if (!shutdown) {
                for (Worker worker : this.workers) {
                    if (worker.task == null) {
                        worker.allowRemove = false;
                        return worker;
                    }
                }
                if (this.workers.size() < this.maxPoolSize) {
                    Worker worker = addWorker(false);
                    if (worker != null) {
                        worker.allowRemove = false;
                        return worker;
                    }
                }
            }
            return null;
        } finally {
            this.mainLock.unlock();
            this.poolSizeLock.unlock();
        }
    }

    /**
     * 线程循环执行，当没有可执行任务时线程处于waiting状态并释放锁和CPU，当非核心线程超时时关闭线程
     */
    private void runWorker(Worker worker) {
        worker.unlock();
        while (true) {
            if (worker.task == null) {
                if (worker.isCore || this.workers.size() <= this.maxPoolSize) {
                    worker.awaitNanos(this.keepAliveTime);
                }
                if (processNonCoreWorkerExit(worker)) {
                    break;
                }
                continue;
            }
            worker.lock();
            try {
                worker.task.run();
            } catch (RuntimeException | Error x) {
                throw x;
            } catch (Throwable x) {
                throw new Error(x);
            } finally {
                worker.terminate();
                worker.unlock();
            }
        }
    }

    /**
     * 处理线程退出操作
     */
    private boolean processNonCoreWorkerExit(Worker worker) {
        if (worker.task == null) {
            this.mainLock.lock();
            try {
                boolean isExit = !worker.isCore && worker.allowRemove && worker.task == null
                        && (worker.isTimeout(this.keepAliveTime) || this.workers.size() > this.maxPoolSize);
                if (isExit) {
                    completedTaskCount.getAndAdd(worker.completedTasks.get());
                    this.workers.remove(worker);
                    return true;
                }
                return false;
            } finally {
                this.mainLock.unlock();
            }
        }
        return false;
    }

    /**
     * 打断所有空闲线程的等待
     */
    public void interruptIdleWorkers() {
        this.mainLock.lock();
        try {
            for (Worker worker : this.workers) {
                if (worker.isWait || worker.getIdleTimeNanos() > 0) {
                    worker.interruptIfStarted();
                }
            }
        } finally {
            this.mainLock.unlock();
        }
    }

    /**
     * 设置分布式线程池核心线程数量
     *
     * @param corePoolSize 核心线程数量
     */
    public void setCorePoolSize(int corePoolSize) {
        if (corePoolSize < 0 || corePoolSize > this.maxPoolSize) {
            throw new IllegalArgumentException();
        }
        int diff = corePoolSize - this.corePoolSize;
        if (diff != 0) {
            this.poolSizeLock.lock();
            try {
                this.mainLock.lock();
                try {
                    this.corePoolSize = corePoolSize;
                    int temp = 0;
                    for (Worker worker : this.workers) {
                        worker.isCore = temp++ < corePoolSize;
                    }
                    if (diff < 0) {
                        interruptIdleWorkers();
                    } else if (corePoolSize > this.workers.size()) {
                        startAllCoreThreads();
                    }
                } finally {
                    this.mainLock.unlock();
                }
            } finally {
                this.poolSizeLock.unlock();
            }
        }
    }

    /**
     * 设置分布式线程池最大线程数量
     *
     * @param maxPoolSize 最大线程数量
     */
    public void setMaxPoolSize(int maxPoolSize) {
        if (maxPoolSize <= 0 || maxPoolSize < this.corePoolSize) {
            throw new IllegalArgumentException();
        }
        this.poolSizeLock.lock();
        try {
            this.maxPoolSize = maxPoolSize;
            if (this.workers.size() > maxPoolSize) {
                interruptIdleWorkers();
            }
        } finally {
            this.poolSizeLock.unlock();
        }
    }

    /**
     * 获取核心线程池数量
     */
    public int getCorePoolSize() {
        return corePoolSize;
    }

    /**
     * 获取最大线程池数量
     */
    public int getMaxPoolSize() {
        return maxPoolSize;
    }

    /**
     * 获取历史最大线程池数量
     */
    public int getLargestPoolSize() {
        return largestPoolSize;
    }

    /**
     * 获取当前激活线程数
     *
     * @return the number of active threads
     */
    public int getActiveCount() {
        return workers.size();
    }

    /**
     * 获取当前任务执行数量
     *
     * @return the number of current tasks
     */
    public long getCurrentTaskCount() {
        this.mainLock.lock();
        try {
            long n = 0;
            for (Worker worker : this.workers) {
                if (worker.task != null) {
                    n++;
                }
            }
            return n;
        } finally {
            this.mainLock.unlock();
        }
    }

    /**
     * 获取总任务执行数量，包括当前正在执行的任务和历史执行数量
     *
     * @return the number of total tasks
     */
    public long getTotalTaskCount() {
        this.mainLock.lock();
        try {
            long n = completedTaskCount.get();
            for (Worker worker : this.workers) {
                n += worker.completedTasks.get();
                if (worker.task != null) {
                    n++;
                }
            }
            return n;
        } finally {
            this.mainLock.unlock();
        }
    }

    /**
     * 获取已完成任务数量
     *
     * @return the number of completed tasks
     */
    public long getCompletedTaskCount() {
        this.mainLock.lock();
        try {
            long n = completedTaskCount.get();
            for (Worker worker : this.workers) {
                n += worker.completedTasks.get();
            }
            return n;
        } finally {
            this.mainLock.unlock();
        }
    }

    /**
     * 同时修改分布式线程池核心线程数量与最大线程数量
     *
     * @param corePoolSize 核心线程数量
     * @param maxPoolSize  最大线程数量
     */
    public void setPoolSize(int corePoolSize, int maxPoolSize) {
        if (corePoolSize < 0 || maxPoolSize <= 0 || maxPoolSize < corePoolSize) {
            throw new IllegalArgumentException();
        }
        if (corePoolSize > this.maxPoolSize) {
            setMaxPoolSize(maxPoolSize);
            setCorePoolSize(corePoolSize);
        } else {
            setCorePoolSize(corePoolSize);
            setMaxPoolSize(maxPoolSize);
        }
    }

    /**
     * 设置非核心线程最大空闲时间
     *
     * @param keepAliveTime 非核心线程最大空闲时间
     * @param timeUnit      非核心线程最大空闲时间的时间单位
     */
    public void setKeepAliveTime(long keepAliveTime, TimeUnit timeUnit) {
        if (keepAliveTime < 0) {
            throw new IllegalArgumentException();
        }
        this.keepAliveTime = timeUnit.toNanos(keepAliveTime);
    }

    /**
     * 得到线程工厂
     */
    private ThreadFactory getThreadFactory() {
        return this.threadFactory;
    }

    final class Worker extends AbstractQueuedSynchronizer implements Runnable {

        private final ReentrantLock mainLock = new ReentrantLock();

        private final Condition termination = mainLock.newCondition();

        private final Thread thread;

        private volatile Runnable task;

        private volatile boolean isCore;

        private volatile long idleStart;

        private volatile boolean allowRemove;

        private volatile boolean isWait;

        private final AtomicLong completedTasks = new AtomicLong();

        private Worker(Runnable task, boolean isCore) {
            setState(-1);
            this.thread = getThreadFactory().newThread(this);
            this.task = task;
            this.isCore = isCore;
            if (task == null) {
                this.idleStart = System.nanoTime();
            }
        }

        @Override
        public void run() {
            runWorker(this);
        }

        private void setTask(Runnable task) {
            this.mainLock.lock();
            try {
                this.task = task;
                this.idleStart = 0L;
                interruptIfStarted();
            } finally {
                this.mainLock.unlock();
            }
        }

        private void terminate() {
            this.mainLock.lock();
            try {
                if (this.task != null) {
                    this.task = null;
                    this.idleStart = System.nanoTime();
                    this.allowRemove = true;
                    this.completedTasks.getAndIncrement();
                }
            } finally {
                this.mainLock.unlock();
            }
        }

        @Override
        protected boolean isHeldExclusively() {
            return getState() != 0;
        }

        @Override
        protected boolean tryAcquire(int unused) {
            if (compareAndSetState(0, 1)) {
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }

        @Override
        protected boolean tryRelease(int unused) {
            setExclusiveOwnerThread(null);
            setState(0);
            return true;
        }

        private void lock() {
            acquire(1);
        }

        private void unlock() {
            release(1);
        }

        private void awaitNanos(long nanosKeepAliveTime) {
            this.mainLock.lock();
            try {
                if (this.isCore) {
                    isWait = true;
                    this.termination.await();
                } else {
                    long nanosTimeout = getNanosTimeout(nanosKeepAliveTime);
                    if (nanosTimeout > 0) {
                        isWait = true;
                        this.termination.awaitNanos(nanosTimeout);
                    }
                }
            } catch (InterruptedException ignore) {
                Thread.currentThread().interrupt();
            } finally {
                isWait = false;
                this.mainLock.unlock();
            }
        }

        private long getNanosTimeout(long nanosKeepAliveTime) {
            if (this.idleStart == 0) {
                return 0;
            }
            return nanosKeepAliveTime + this.idleStart - System.nanoTime();
        }

        private boolean isTimeout(long nanosKeepAliveTime) {
            return this.idleStart != 0L && System.nanoTime() - this.idleStart > nanosKeepAliveTime;
        }

        private void interruptIfStarted() {
            if (isWait && this.task != null) {
                this.mainLock.lock();
                try {
                    this.termination.signal();
                } finally {
                    this.mainLock.unlock();
                }
            }
        }

        public String getName() {
            return this.thread.getName();
        }

        private long getIdleTimeNanos() {
            if (this.idleStart == 0) {
                return 0;
            }
            return System.nanoTime() - this.idleStart;
        }

        @Override
        public boolean equals(Object obj) {
            if (obj instanceof Worker) {
                Worker worker = (Worker) obj;
                if (worker.hashCode() == this.hashCode()) {
                    return true;
                }
            }
            return super.equals(obj);
        }

        @Override
        public int hashCode() {
            return super.hashCode();
        }
    }

    private static class DefaultEasyMsThreadFactory implements ThreadFactory {

        private static final AtomicInteger POOL_NUMBER = new AtomicInteger(1);

        private final ThreadGroup group;

        private final AtomicInteger threadNumber = new AtomicInteger(1);

        private final String namePrefix;

        private DefaultEasyMsThreadFactory() {
            SecurityManager s = System.getSecurityManager();
            this.group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup();
            this.namePrefix = "easyms-" + POOL_NUMBER.getAndIncrement() + "-thread-";
        }

        @Override
        public Thread newThread(Runnable r) {
            Thread t = new Thread(this.group, r, this.namePrefix + this.threadNumber.getAndIncrement(), 0);
            if (t.isDaemon()) {
                t.setDaemon(false);
            }
            if (t.getPriority() != Thread.NORM_PRIORITY) {
                t.setPriority(Thread.NORM_PRIORITY);
            }
            return t;
        }
    }
}